计算当前行中的值与每组第一行中的值之间的差异 - pyspark
Calculate difference between value in current row and value in first row per group - pyspark
我有这个数据框:
DataFrame[date: string, t: string, week: string, a: bigint, b: bigint]
具有以下数据:
+---------+--+--------+---+---+
|date |t |week |a |b |
+---------+--+--------+---+---+
|20180328 |1 |2018-W10|31 |35 |
|20180328 |1 |2018-W11|18 |37 |
|20180328 |1 |2018-W12|19 |37 |
|20180328 |1 |2018-W13|19 |38 |
|20180328 |1 |2018-W14|20 |38 |
|20180328 |1 |2018-W15|22 |39 |
|20180328 |1 |2018-W16|23 |39 |
|20180328 |1 |2018-W17|24 |40 |
|20180328 |1 |2018-W18|25 |40 |
|20180328 |1 |2018-W19|25 |41 |
|20180328 |1 |2018-W20|26 |41 |
|20180328 |1 |2018-W21|26 |41 |
|20180328 |1 |2018-W22|26 |41 |
|20180328 |2 |2018-W10|14 |26 |
|20180328 |2 |2018-W11|82 |33 |
|20180328 |2 |2018-W12|87 |36 |
|20180328 |2 |2018-W13|89 |39 |
|20180328 |2 |2018-W14|10 |45 |
|20180328 |2 |2018-W15|10 |45 |
|20180328 |2 |2018-W16|11 |48 |
|20180328 |2 |2018-W17|11 |55 |
|20180328 |2 |2018-W18|11 |60 |
|20180328 |2 |2018-W19|11 |70 |
|20180328 |2 |2018-W20|11 |79 |
|20180328 |2 |2018-W21|11 |86 |
|20180328 |2 |2018-W22|12 |93 |
+---------+--+--------+---+---+
我想添加一个新列,对于每个日期和类型(列 t
),该行与列 b
的该日期的第一周之间的差异。
像这样:
+---------+--+--------+---+---+---+
|date |t |week |a |b |h |
+---------+--+--------+---+---+---+
|20180328 |1 |2018-W10|31 |35 |0 |
|20180328 |1 |2018-W11|18 |37 |2 |
|20180328 |1 |2018-W12|19 |37 |2 |
|20180328 |1 |2018-W13|19 |38 |3 |
|20180328 |1 |2018-W14|20 |38 |3 |
|20180328 |1 |2018-W15|22 |39 |4 |
|20180328 |1 |2018-W16|23 |39 |4 |
|20180328 |1 |2018-W17|24 |40 |5 |
|20180328 |1 |2018-W18|25 |40 |5 |
|20180328 |1 |2018-W19|25 |41 |6 |
|20180328 |1 |2018-W20|26 |41 |6 |
|20180328 |1 |2018-W21|26 |41 |6 |
|20180328 |1 |2018-W22|26 |41 |6 |
|20180328 |2 |2018-W10|14 |26 |0 |
|20180328 |2 |2018-W11|82 |33 |7 |
|20180328 |2 |2018-W12|87 |36 |10 |
|20180328 |2 |2018-W13|89 |39 |13 |
|20180328 |2 |2018-W14|10 |45 |19 |
|20180328 |2 |2018-W15|10 |45 |19 |
|20180328 |2 |2018-W16|11 |48 |22 |
|20180328 |2 |2018-W17|11 |55 |29 |
|20180328 |2 |2018-W18|11 |60 |34 |
|20180328 |2 |2018-W19|11 |70 |44 |
|20180328 |2 |2018-W20|11 |79 |53 |
|20180328 |2 |2018-W21|11 |86 |60 |
|20180328 |2 |2018-W22|12 |93 |67 |
+---------+--+--------+---+---+---+
h 列中的每个数字都是 col('b') 中的值 - col('b') 中该类型在 W10 的值。
您可以使用 pyspark.sql.Window
.
按列 't'
分区并按列 'week'
排序。这是有效的,因为对您的周列进行排序将按字典顺序排序,并且 'W10'
将是您的组的第一个值。如果不是这种情况,您将需要找到另一种方法来对列进行排序,以便顺序是您想要的。
这是一个简化的例子。
data = [
('20180328',1,'2018-W10',31,35),
('20180328',1,'2018-W11',18,37),
('20180328',1,'2018-W12',19,37),
('20180328',1,'2018-W13',19,38),
('20180328',1,'2018-W14',20,38),
('20180328',2,'2018-W10',14,26),
('20180328',2,'2018-W11',82,33),
('20180328',2,'2018-W12',87,36),
('20180328',2,'2018-W13',89,39)
]
df = sqlCtx.createDataFrame(data, ['date', 't', 'week', 'a', 'b'])
df.show()
#+--------+---+--------+---+---+
#| date| t| week| a| b|
#+--------+---+--------+---+---+
#|20180328| 1|2018-W10| 31| 35|
#|20180328| 1|2018-W11| 18| 37|
#|20180328| 1|2018-W12| 19| 37|
#|20180328| 1|2018-W13| 19| 38|
#|20180328| 1|2018-W14| 20| 38|
#|20180328| 2|2018-W10| 14| 26|
#|20180328| 2|2018-W11| 82| 33|
#|20180328| 2|2018-W12| 87| 36|
#|20180328| 2|2018-W13| 89| 39|
#+--------+---+--------+---+---+
使用 pyspark DataFrame 函数
定义 Window:
from pyspark.sql import Window
w = Window.partitionBy('t').orderBy('week')
使用 Window 创建新列:
import pyspark.sql.functions as f
df = df.select('*', (f.col('b') - f.first('b').over(w)).alias('h'))
df.show()
#+--------+---+--------+---+---+---+
#| date| t| week| a| b| h|
#+--------+---+--------+---+---+---+
#|20180328| 1|2018-W10| 31| 35| 0|
#|20180328| 1|2018-W11| 18| 37| 2|
#|20180328| 1|2018-W12| 19| 37| 2|
#|20180328| 1|2018-W13| 19| 38| 3|
#|20180328| 1|2018-W14| 20| 38| 3|
#|20180328| 2|2018-W10| 14| 26| 0|
#|20180328| 2|2018-W11| 82| 33| 7|
#|20180328| 2|2018-W12| 87| 36| 10|
#|20180328| 2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+
使用pyspark-sql
这是使用 pyspark-sql 的等效操作:
df.registerTempTable('myTable')
df = sqlCtx.sql(
"SELECT *, (b - FIRST(b) OVER (PARTITION BY t ORDER BY week)) AS h FROM myTable"
)
df.show()
#+--------+---+--------+---+---+---+
#| date| t| week| a| b| h|
#+--------+---+--------+---+---+---+
#|20180328| 1|2018-W10| 31| 35| 0|
#|20180328| 1|2018-W11| 18| 37| 2|
#|20180328| 1|2018-W12| 19| 37| 2|
#|20180328| 1|2018-W13| 19| 38| 3|
#|20180328| 1|2018-W14| 20| 38| 3|
#|20180328| 2|2018-W10| 14| 26| 0|
#|20180328| 2|2018-W11| 82| 33| 7|
#|20180328| 2|2018-W12| 87| 36| 10|
#|20180328| 2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+
相关
我有这个数据框:
DataFrame[date: string, t: string, week: string, a: bigint, b: bigint]
具有以下数据:
+---------+--+--------+---+---+
|date |t |week |a |b |
+---------+--+--------+---+---+
|20180328 |1 |2018-W10|31 |35 |
|20180328 |1 |2018-W11|18 |37 |
|20180328 |1 |2018-W12|19 |37 |
|20180328 |1 |2018-W13|19 |38 |
|20180328 |1 |2018-W14|20 |38 |
|20180328 |1 |2018-W15|22 |39 |
|20180328 |1 |2018-W16|23 |39 |
|20180328 |1 |2018-W17|24 |40 |
|20180328 |1 |2018-W18|25 |40 |
|20180328 |1 |2018-W19|25 |41 |
|20180328 |1 |2018-W20|26 |41 |
|20180328 |1 |2018-W21|26 |41 |
|20180328 |1 |2018-W22|26 |41 |
|20180328 |2 |2018-W10|14 |26 |
|20180328 |2 |2018-W11|82 |33 |
|20180328 |2 |2018-W12|87 |36 |
|20180328 |2 |2018-W13|89 |39 |
|20180328 |2 |2018-W14|10 |45 |
|20180328 |2 |2018-W15|10 |45 |
|20180328 |2 |2018-W16|11 |48 |
|20180328 |2 |2018-W17|11 |55 |
|20180328 |2 |2018-W18|11 |60 |
|20180328 |2 |2018-W19|11 |70 |
|20180328 |2 |2018-W20|11 |79 |
|20180328 |2 |2018-W21|11 |86 |
|20180328 |2 |2018-W22|12 |93 |
+---------+--+--------+---+---+
我想添加一个新列,对于每个日期和类型(列 t
),该行与列 b
的该日期的第一周之间的差异。
像这样:
+---------+--+--------+---+---+---+
|date |t |week |a |b |h |
+---------+--+--------+---+---+---+
|20180328 |1 |2018-W10|31 |35 |0 |
|20180328 |1 |2018-W11|18 |37 |2 |
|20180328 |1 |2018-W12|19 |37 |2 |
|20180328 |1 |2018-W13|19 |38 |3 |
|20180328 |1 |2018-W14|20 |38 |3 |
|20180328 |1 |2018-W15|22 |39 |4 |
|20180328 |1 |2018-W16|23 |39 |4 |
|20180328 |1 |2018-W17|24 |40 |5 |
|20180328 |1 |2018-W18|25 |40 |5 |
|20180328 |1 |2018-W19|25 |41 |6 |
|20180328 |1 |2018-W20|26 |41 |6 |
|20180328 |1 |2018-W21|26 |41 |6 |
|20180328 |1 |2018-W22|26 |41 |6 |
|20180328 |2 |2018-W10|14 |26 |0 |
|20180328 |2 |2018-W11|82 |33 |7 |
|20180328 |2 |2018-W12|87 |36 |10 |
|20180328 |2 |2018-W13|89 |39 |13 |
|20180328 |2 |2018-W14|10 |45 |19 |
|20180328 |2 |2018-W15|10 |45 |19 |
|20180328 |2 |2018-W16|11 |48 |22 |
|20180328 |2 |2018-W17|11 |55 |29 |
|20180328 |2 |2018-W18|11 |60 |34 |
|20180328 |2 |2018-W19|11 |70 |44 |
|20180328 |2 |2018-W20|11 |79 |53 |
|20180328 |2 |2018-W21|11 |86 |60 |
|20180328 |2 |2018-W22|12 |93 |67 |
+---------+--+--------+---+---+---+
h 列中的每个数字都是 col('b') 中的值 - col('b') 中该类型在 W10 的值。
您可以使用 pyspark.sql.Window
.
按列 't'
分区并按列 'week'
排序。这是有效的,因为对您的周列进行排序将按字典顺序排序,并且 'W10'
将是您的组的第一个值。如果不是这种情况,您将需要找到另一种方法来对列进行排序,以便顺序是您想要的。
这是一个简化的例子。
data = [
('20180328',1,'2018-W10',31,35),
('20180328',1,'2018-W11',18,37),
('20180328',1,'2018-W12',19,37),
('20180328',1,'2018-W13',19,38),
('20180328',1,'2018-W14',20,38),
('20180328',2,'2018-W10',14,26),
('20180328',2,'2018-W11',82,33),
('20180328',2,'2018-W12',87,36),
('20180328',2,'2018-W13',89,39)
]
df = sqlCtx.createDataFrame(data, ['date', 't', 'week', 'a', 'b'])
df.show()
#+--------+---+--------+---+---+
#| date| t| week| a| b|
#+--------+---+--------+---+---+
#|20180328| 1|2018-W10| 31| 35|
#|20180328| 1|2018-W11| 18| 37|
#|20180328| 1|2018-W12| 19| 37|
#|20180328| 1|2018-W13| 19| 38|
#|20180328| 1|2018-W14| 20| 38|
#|20180328| 2|2018-W10| 14| 26|
#|20180328| 2|2018-W11| 82| 33|
#|20180328| 2|2018-W12| 87| 36|
#|20180328| 2|2018-W13| 89| 39|
#+--------+---+--------+---+---+
使用 pyspark DataFrame 函数
定义 Window:
from pyspark.sql import Window
w = Window.partitionBy('t').orderBy('week')
使用 Window 创建新列:
import pyspark.sql.functions as f
df = df.select('*', (f.col('b') - f.first('b').over(w)).alias('h'))
df.show()
#+--------+---+--------+---+---+---+
#| date| t| week| a| b| h|
#+--------+---+--------+---+---+---+
#|20180328| 1|2018-W10| 31| 35| 0|
#|20180328| 1|2018-W11| 18| 37| 2|
#|20180328| 1|2018-W12| 19| 37| 2|
#|20180328| 1|2018-W13| 19| 38| 3|
#|20180328| 1|2018-W14| 20| 38| 3|
#|20180328| 2|2018-W10| 14| 26| 0|
#|20180328| 2|2018-W11| 82| 33| 7|
#|20180328| 2|2018-W12| 87| 36| 10|
#|20180328| 2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+
使用pyspark-sql
这是使用 pyspark-sql 的等效操作:
df.registerTempTable('myTable')
df = sqlCtx.sql(
"SELECT *, (b - FIRST(b) OVER (PARTITION BY t ORDER BY week)) AS h FROM myTable"
)
df.show()
#+--------+---+--------+---+---+---+
#| date| t| week| a| b| h|
#+--------+---+--------+---+---+---+
#|20180328| 1|2018-W10| 31| 35| 0|
#|20180328| 1|2018-W11| 18| 37| 2|
#|20180328| 1|2018-W12| 19| 37| 2|
#|20180328| 1|2018-W13| 19| 38| 3|
#|20180328| 1|2018-W14| 20| 38| 3|
#|20180328| 2|2018-W10| 14| 26| 0|
#|20180328| 2|2018-W11| 82| 33| 7|
#|20180328| 2|2018-W12| 87| 36| 10|
#|20180328| 2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+
相关